package com.tpvlog.drc.server.node.master;

import com.tpvlog.drc.server.config.Configuration;

import java.io.IOException;
import java.net.InetSocketAddress;
import java.net.Socket;
import java.util.ArrayList;
import java.util.List;

/**
 * 集群内部的master节点之间进行网络通信的管理组件
 * <p>
 * 1、跟其他的master节点建立网络连接，避免出现重复的连接
 * 2、在底层基于队列和线程，帮助我们发送请求给其他的机器节点
 * 3、同上，需要接受其他节点发送过来的请求，交给我们来进行业务逻辑上的处理
 */
public class MasterNetworkManager {

    /**
     * 连接超时时间
     */
    public static final Integer CONNECT_TIMEOUT = 5000;

    /**
     * 等待id排在这个节点id之后的master节点来连接自己
     */
    public void waitAfterMasterNodesConnect() {
        new MasterConnectionListener().start();
    }

    /**
     * 主动去连接id排在自己id之前的master节点
     *
     * @return
     */
    public Boolean connectBeforeMasterNodes() {
        // 获取id比自己小的那些master节点
        List<String> beforeMasterNodes = getBeforeMasterNodes();
        if (beforeMasterNodes.size() == 0) {
            return true;
        }

        // 跟那些id比自己小的master节点建立网络连接
        for (String beforeMasterNode : beforeMasterNodes) {
            try {
                connectBeforeMasterNode(beforeMasterNode);
            } catch (IOException e) {
                e.printStackTrace();
            }
        }

        return true;
    }

    /**
     * 连接id比自己小的master节点
     *
     * @param beforeMasterNode
     * @return
     */
    private Boolean connectBeforeMasterNode(String beforeMasterNode) throws IOException {
        // 获取id比自己小的master节点的ip地址和端口号
        String[] beforeMasterNodeSplited = beforeMasterNode.split(":");
        String ip = beforeMasterNodeSplited[1];
        int port = Integer.valueOf(beforeMasterNodeSplited[2]);

        // 封装一下id比自己小的那个master节点的网络地址
        InetSocketAddress endpoint = new InetSocketAddress(ip, port);

        // 封装Socket网络连接，对id比自己小的master节点发起连接请求
        Socket socket = new Socket();
        socket.setTcpNoDelay(true);
        socket.setSoTimeout(0);
        socket.connect(endpoint, CONNECT_TIMEOUT);

        // 为这个建立成功的网络连接启动读写IO线程
        new MasterNetworkWriteThread(socket).start();
        new MasterNetworkReadThread(socket).start();

        return true;
    }

    /**
     * 获取id比子小的master节点列表
     *
     * @return
     */
    private List<String> getBeforeMasterNodes() {
        List<String> beforeMasterNodes = new ArrayList<String>();

        Configuration configuration = Configuration.getInstance();

        Integer myNodeId = configuration.getNodeId();

        String masterNodeServers = configuration.getMasterNodeServers();
        String[] masterNodeServersSplited = masterNodeServers.split(",");

        for (String masterNodeServer : masterNodeServersSplited) {
            String[] masterNodeServerSplited = masterNodeServer.split(":");
            Integer nodeId = Integer.valueOf(masterNodeServerSplited[0]);
            if (nodeId < myNodeId) {
                beforeMasterNodes.add(masterNodeServer);
            }
        }

        return beforeMasterNodes;
    }

}
